package org.voovan.korla.message;

import org.voovan.korla.exception.KorlaException;
import org.voovan.tools.log.Logger;
import org.voovan.tools.reflect.TReflect;

import java.util.concurrent.TimeoutException;

import static org.voovan.korla.KorlaStatic.*;

/**
 * 消息类
 *
 * @author: helyho
 * korla Framework.
 * WebSite: https://github.com/helyho/korla
 * Licence: Apache v2 License
 */
public class Msg {
    public static int MSG_TYPE_NORMAL   = 1; //响应消息
    public static int MSG_TYPE_REG      = 2; //注册消息

    public static int MSG_SOURCE_SND = 0; //发送来源
    public static int MSG_SOURCE_RCV = 1; //接收来源

    private Long id = UNIQUE_ID.nextNumber();
    private String channel;
    private Integer type;
    private Integer source;
    private Object sliceKey; //用于分片的数据
    private boolean forceFlush = false;

    //1. Provider: 是否缓存幂等数据, 缓存后供消费信息检查幂等是否存在
    //2. Consumer: 是否检查幂等数据, 可以用以绕过幂等直达业务逻辑数据的返回
    private boolean idempotence = DEFAULT_IDEMPOTENCE;

    private Long timestamp = 0L;

    //这里不适用 transient 是应为要持久化, 通过在序列化过滤器中屏蔽 Callback 对象
    private Callback callBack;
    private transient volatile boolean done = false;
    private transient volatile Msg responsMsg = null;

    private transient Object lock = new Object();

    public Msg() {
        this.type = MSG_TYPE_NORMAL;
    }

    public Msg(Long id) {
        this.type = MSG_TYPE_NORMAL;
        this.id = id;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getChannel() {
        return channel;
    }

    public void setChannel(String channel) {
        this.channel = channel;
    }

    public Integer getType() {
        return type;
    }

    public void setType(Integer type) {
        this.type = type;
    }

    public Integer getSource() {
        return source;
    }

    public void setSource(Integer source) {
        this.source = source;
    }

    public Object getSliceKey() {
        return sliceKey;
    }

    public void setSliceKey(Object sliceKey) {
        this.sliceKey = sliceKey;
    }

    public Boolean isIdempotence() {
        return idempotence;
    }

    public void setIdempotence(Boolean idempotence) {
        this.idempotence = idempotence;
    }

    public boolean isExpire() {
        if(System.currentTimeMillis() - timestamp > MESSAGE_EXPIRE * 1000) {
            return true;
        } else {
            return false;
        }
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public void updateTimestamp() {
        this.timestamp = System.currentTimeMillis();
    }

    public void setRespons(Msg responsMsg) {
        this.responsMsg = responsMsg;
        synchronized (lock) {
            setDone(true);
            lock.notify();
        }
    }

    public Msg getResponse(long timeoutMillis) throws TimeoutException {
        synchronized (lock) {
            if(!isDone()) {
                try {
                    lock.wait(timeoutMillis);
                    if (!isDone()) {
                        throw new TimeoutException("Msg.getResponse timeout");
                    }
                } catch (InterruptedException e) {
                    Logger.error("[Korla] Msg.getResponse has error", e);
                }
            }
        }

        removeFromCache(this);
        return responsMsg;
    }

    public Msg getResponse() {
        if(!isDone()) {
            removeFromCache(this);
        }
        return responsMsg;
    }

    public Callback getCallBack() {
        return callBack;
    }

    public void setCallBack(Callback callBack) {
        this.callBack = callBack;
    }

    public boolean isDone() {
        return done;
    }

    public void setDone(boolean done) {
        this.done = done;
    }

    public boolean isForceFlush() {
        return forceFlush;
    }

    public void setForceFlush(boolean forceFlush) {
        this.forceFlush = forceFlush;
    }

    public <T extends Msg> T initAnswer(T msg) {
        msg.setId(this.id);
        msg.setType(Msg.MSG_TYPE_NORMAL);
        return msg;
    }

    public <T extends Msg> T buildAnswer() {
        return (T)buildAnswer(this.getClass());
    }

    public <T extends Msg> T buildAnswer(Class<T> msgClass) {
        try {
            T answerMsg = TReflect.newInstance(msgClass);
            return initAnswer(answerMsg);
        } catch (ReflectiveOperationException e) {
            throw new KorlaException("Create answer Msg error", e);
        }
    }

    /**
     * 是否是正常消息
     * @return true: 是, false: 否
     */
    public boolean isNormalMsg() {
        return this.type == Msg.MSG_TYPE_NORMAL;
    }

    /**
     * 是否是注册消息
     * @return true: 是, false: 否
     */
    public boolean isRegMsg() {
        return this.type == Msg.MSG_TYPE_REG;
    }

    /**
     * 是否是同步读取的消息
     * @return true: 是, false: 否
     */
    public boolean isSyncMsg() {
        return this.getCallBack() instanceof SyncCallback;
    }

    public static void removeFromCache(Msg msg) {
        if (msg.done) {
            MESSAGE.remove(msg.getId());
            MESSAGE_CACHE.remove(msg.getId());
        }
    }
}
